#!/usr/bin/env python

# BEGIN_COPYRIGHT
#
# Copyright 2009-2018 CRS4.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# END_COPYRIGHT

"""
Build a catalog of file system objects.
"""

import hashlib
import pydoop.hdfs as hdfs

FIELDS = 'name owner group size permissions last_mod md5'.split()
AVE_REC_SIZE = 200  # average
CHUNK_SIZE = 2**26  # 64MB


def pack_info(info):
    w = map(str, map(info.get, FIELDS))
    return '\t'.join(w)


def unpack_info(line):
    return dict(zip(FIELDS, line.split('\t')))


def compute_md5(fname, fs=None):
    host, port, path = hdfs.path.split(fname)
    if fs is None:
        fs = hdfs.hdfs(host, port)
    block_size = CHUNK_SIZE
    m = hashlib.md5()
    with fs.open_file(path) as f:
        data = f.read(block_size)
        while data:
            m.update(data)
            data = f.read(block_size)
    return m.hexdigest()


def isdir(fs, d):
    try:
        info = fs.get_path_info(d)
    except IOError:
        return False
    return info['kind'] == 'directory'


class Scanner(object):

    def __init__(self, fs, root, catalog, hash=False):
        #-- <pre-requisites>
        root_info = fs.get_path_info(root)
        assert root_info['kind'] == 'directory'
        #-- </pre-requisites>
        self.fs = fs
        self.root = root
        self.catalog = catalog
        self.hash = hash
        self.children = []
        self.depth = 0

    def leaves(self):
        if self.depth > 0:
            return sum(map(lambda c: c.leaves(), self.children), [])
        else:
            return [self.root]

    def can_go_deeper(self):
        if self.depth == 0:
            return True
        for child in self.children:
            if child.can_go_deeper():
                return True
        return False

    def scan_level_zero(self):
        self.children = []
        for info in self.fs.list_directory(self.root):
            if info['kind'] == 'directory':
                child = Scanner(self.fs, info['name'], self.catalog,
                                hash=self.hash)
                self.children.append(child)
            else:
                info['md5'] = '' if not self.hash else \
                              compute_md5(info['name'], self.fs)
                self.catalog.append(info)

    def go_deeper(self):
        if self.depth == 0:
            self.scan_level_zero()
        else:
            for child in self.children:
                child.go_deeper()
        self.depth += 1


def build_catalog(fs, root, catalog, max_depth=None, max_size=None,
                  hash=False):
    scanner = Scanner(fs, root, catalog, hash=hash)
    while (scanner.can_go_deeper()
           and (max_size is None or len(catalog) <= max_size)
           and (max_depth is None or scanner.depth < max_depth)):
        scanner.go_deeper()
    return scanner


#-- pydoop script mapper and driver -------------------------------------------
class Catalog(object):

    def __init__(self, writer):
        self.writer = writer

    def append(self, x):
        self.writer.emit('', pack_info(x))


def catalog_mapper(_, root, writer):
    root = root.strip()
    catalog = Catalog(writer)
    host, port, path = hdfs.path.split(root)
    fs = hdfs.hdfs(host, port)
    build_catalog(fs, path, catalog)
    fs.close()


def hash_mapper(_, pinfo, writer):
    info = unpack_info(pinfo.strip())
    writer.emit(info['name'], pinfo)


def hash_reducer(fname, lpinfo, writer, jc):
    prefix = jc.get('fscanner.path.prefix', '')
    info = unpack_info(next(lpinfo))
    writer.status('processing {}'.format(info['name']))
    info['md5'] = compute_md5(info['name'])
    if prefix != '':
        info['name'] = info['name'].replace(prefix, "./", 1)
    writer.emit('', pack_info(info))
#------------------------------------------------------------------------------


def make_parser():
    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument('-d', '--directory', metavar='DIR',
                        help='top dir url (use file:// for a standard dir.')
    parser.add_argument('--max-depth', metavar='MAX_DEPTH',
                        type=int,
                        default=-1,
                        help='maximal local scan depth')
    parser.add_argument('--max-size', metavar='MAX_SIZE',
                        type=int,
                        default=-1,
                        help='maximal catalog size')
    parser.add_argument('--n-tasks', metavar='N_TASKS',
                        type=int,
                        default=-1,
                        help='number of tasks for hadoop job')
    parser.add_argument('--cat-file', metavar='CAT_FILE',
                        default='catalog.list',
                        help='filename')
    parser.add_argument("--hadoop", help="will fork off an hadoop if needed",
                        action="store_true")
    parser.add_argument('--hadoop-dir', metavar='HADOOP_DIR',
                        default='fscanner',
                        help='hadoop output dir')
    parser.add_argument("--hash", help="will compute hash",
                        action="store_true")
    return parser


def gather_catalog(root, max_depth, max_size, hash, cat_fname):
    logger = logging.getLogger("gather_catalog")
    logger.info('building file list starting from {}'.format(root))
    host, port, path = hdfs.path.split(root)
    fs = hdfs.hdfs(host, port)
    if not isdir(fs, path):
        sys.exit("%r does not exist" % root)
    fs.set_working_directory(path)
    root = "{}".format(fs.working_directory())
    logger.info('root: {}'.format(root))
    catalog = []
    scanner = build_catalog(fs, root, catalog, max_depth, max_size, hash)
    fs.close()
    with open(cat_fname, 'w') as o:
        if len(catalog) > 0:
            logger.info('dumping {} catalog entries'.format(len(catalog)))
            for x in catalog:
                o.write(pack_info(x) + '\n')
    logger.info('done.')
    return scanner.leaves()


def main(argv=None):
    logger = logging.getLogger("main")
    parser = make_parser()
    args = parser.parse_args(argv)
    if not args.directory:
        parser.print_help()
        sys.exit(1)
    root = args.directory
    cat_fname = args.cat_file
    max_depth = args.max_depth if args.max_depth >= 0 else None
    max_size = args.max_size if args.max_size >= 0 else None
    logger.debug('args.hash: {}'.format(args.hash))
    if not args.hadoop:
        lvs = gather_catalog(root, max_depth, max_size, args.hash,
                             cat_fname)
        logger.info('there are {} leaves left.'.format(len(lvs)))
    else:
        hroot = args.hadoop_dir
        n_tasks = args.n_tasks
        runner = PydoopScriptRunner(hroot, logger=logger)
        _, tmp_cat_file = tempfile.mkstemp()
        leaves = gather_catalog(root, max_depth, max_size, False,
                                tmp_cat_file)
        if len(leaves) > 0:
            logger.info('using hadoop to fully explore dir tree.')
            _, roots_file = tempfile.mkstemp()
            with open(roots_file, 'w') as o:
                for l in leaves:
                    o.write(l + '\n')
            runner.set_input(roots_file, put=True)
            runner.run(__file__, more_args=[
                '--map-fn', 'catalog_mapper',
                '--num-reducers', '0',
                '--kv-separator', '',
                '-D', 'mapred.map.tasks={}'.format(n_tasks),
            ])
            os.unlink(roots_file)
        else:
            out_dir = hdfs.path.join(runner.wd, 'out_info')
            hdfs.mkdir(out_dir)
            runner.set_output(out_dir)
        hdfs.put(tmp_cat_file, runner.output)

        prefix = os.path.dirname(root)
        if args.hash:
            logger.info('using hadoop to gather file info.')
            out_dir = hdfs.path.join(runner.wd, 'out_hashed')
            runner.set_input(runner.output)
            runner.set_output(out_dir)
            logger.debug('dumping to {}'.format(out_dir))
            runner.run(__file__, more_args=[
                '--map-fn', 'hash_mapper',
                '--reduce-fn', 'hash_reducer',
                '--num-reducers', str(n_tasks),
                '--kv-separator', '',
                '-D', 'fscanner.path.prefix={}'.format(prefix),
            ])
        with open(cat_fname, 'w') as o:
            o.write('\t'.join(FIELDS) + '\n')
        runner.collect_output(cat_fname)
        runner.clean()


if __name__ == "__main__":
    import sys
    import os
    import logging
    import tempfile
    import argparse
    logging.basicConfig(level=logging.DEBUG)
    from pydoop.hadut import PydoopScriptRunner
    main()
